package org.laofeng.nifi.processors;

/*
*
* */

import net.sourceforge.tess4j.ITesseract;
import net.sourceforge.tess4j.Tesseract;
import net.sourceforge.tess4j.TesseractException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.SideEffectFree;

import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

import javax.imageio.ImageIO;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

//@SideEffectFree
//@PrimaryNodeOnly
@Tags({"ocr", "Tesseract", "image","picture","jpg", "jpeg", "gif"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("使用Tesseract解析图片中的文字，注意这里并没有验证输入FlowFile的Content，如果FlowFile的Content不是图片文件，会解析失败！")
@SideEffectFree
@PrimaryNodeOnly

public class OcrTesseractProcessor extends AbstractProcessor {

    public static final PropertyDescriptor _lang = new PropertyDescriptor.Builder()
            .name("lang")
            .defaultValue("chi_sim")
            .displayName("language")
            .description("简体中文，暂不支持修改！")
            .required(true)
            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
            .build();

    public static final PropertyDescriptor _tess_lang_Path = new PropertyDescriptor.Builder()
            .name("tess lang path")
            .required(true)
            .description("Tesseract 语言包存放路径")
            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
            .build();


    public static final PropertyDescriptor _tess_lib_Path = new PropertyDescriptor.Builder()
            .name("tess lib path")
            .required(true)
            .description("Tesseract lib路径，确保libtesseract.dylib文件在此路径中存在。")
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final PropertyDescriptor _text_file_suffix = new PropertyDescriptor.Builder()
            .name("text file suffix")
            .required(true)
            .description("filename attribute suffix")
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();



    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;
    private String tessDataPath;
    private String lang;





    public static final Relationship SUCCESS = new Relationship.Builder()
            .name("SUCCESS")
            .description("Succes relationship")
            .build();


    public static final Relationship FAILURE = new Relationship.Builder()
            .name("FAILURE")
            .description("failure relationship")
            .build();



    //
    public List<PropertyDescriptor> getProperties() {
        return properties;
    }


    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    public void init(final ProcessorInitializationContext context){

        List<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(_lang);
        properties.add(_tess_lang_Path);
        properties.add(_tess_lib_Path);


        this.properties = Collections.unmodifiableList(properties);

        Set<Relationship> relationships = new HashSet<>();
        relationships.add(SUCCESS);
        relationships.add(FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.lang = context.getProperty(_lang).getValue();
        this.tessDataPath = context.getProperty(_tess_lang_Path).getValue();
        String tess_lib_Path = context.getProperty(_tess_lib_Path).getValue();
        if (tess_lib_Path != null && !tess_lib_Path.isEmpty())
        System.setProperty("jna.library.path",tess_lib_Path);
    }
    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile ff = session.get();
        if (ff == null){
            return;
        }

        final AtomicReference<String> value = new AtomicReference<>();
        final AtomicReference<Exception> ex = new AtomicReference<>();
        session.read(ff, new InputStreamCallback() {

            public void process(InputStream in) throws IOException {
                //ByteArrayOutputStream out = new ByteArrayOutputStream();
                //IOUtils.copy(in,out);
                ITesseract instance = new Tesseract();
                instance.setDatapath(tessDataPath); //相对目录，这个时候tessdata目录和src目录平级
                if (lang != null){
                    instance.setLanguage(lang);
                }

                try {
                    String words = instance.doOCR(ImageIO.read(in));
                    value.set(words);
                } catch (TesseractException e) {
                    e.printStackTrace();
                    ex.set(e);
                }

            }
        });
        Exception e = ex.get();
        if (e != null){
            super.getLogger().error("tess orc error",e);
            session.transfer(ff, FAILURE);
            return;
        }
        String words = value.get();
        if(words != null && !words.isEmpty()){
            FlowFile new_ff = session.write(ff, new OutputStreamCallback() {
                public void process(OutputStream out) throws IOException {
                    out.write(value.get().getBytes());
                }
            });
            String file_name_key = "filename";
            String file_name = ff.getAttribute(file_name_key);
            file_name = file_name+".txt";
            session.putAttribute(new_ff,file_name_key,file_name);

            session.getProvenanceReporter().create(new_ff,"export words from images");
            session.transfer(new_ff, SUCCESS);
        } else {
            session.transfer(ff, FAILURE);
        }
    }
}
